Published on

BI 工具中跨数据源 JOIN 设计

Authors
  • avatar
    Name
    Wang Zhiwei
    Twitter

FineBI

通过自助数据集实现多源(不同数据源) JOIN。

例如:

有数据源 A 连接 MySQL,有数据源 B 连接 PostgreSQL,数据集 a 为查询 A 得到,数据集 b 为查询 B 得到,通过在数据集 a、b 上执行 JOIN 查询间接实现了同时在 MySQL 和 PostgreSQL 上不同表的 JOIN。

FineBI 并未开源,见 https://help.fanruan.com/finebi/doc-view-663.html 中描述的数据量限制可以猜测其跨数据源 JOIN 的实现是对自助数据集进行持久化到本地缓存,存储引擎为某种支持 SQL 的数据库,将来自不同类型数据库的数据缓存到相同的数据库中,即可对其进行 JOIN 查询。

https://help.fanruan.com/finebi/doc-view-512.html

Redash

基于 Query Results Data Source (QRDS) 实现多源 JOIN。

Redash 的 QRDS 类似 FineBI 的自助数据集,QRDS 只保存查询语句不保存查询结果,为惰性查询。QRDS 由普通的查询语句生成,比如 sql_1 SELECT a,b,c,d FROM t1 可以保存为 QRDS qrds_1,使用 qrds_1 作为数据源查询 SELECT a,b FROM qrds_1,会先执行 sql_1 并将查询结果缓存到 sqlite 内存临时表 t_tmp 中,再进行查询,即转换为查询 SELECT a,b FROM t_tmp

基于以上特性,可以理解 Redash 的多源 JOIN 实现原理,缓存 QRDS 查询结果到 sqlite 内存临时表中,再基于这些临时表进行 JOIN 查询。

实现部分源码:

# 从查询语句创建 sqlite 的临时表
def create_tables_from_query_ids(user, connection, query_ids):
   for query_id in set(query_ids):
        # 取出查询结果
        results = get_query_results(user, query_id, False)
        # QRDS 临时表名
        table_name = "query_{query_id}".format(query_id=query_id)
        # 创临时表并写入查询结果数据
        create_table(connection, table_name, results)


# 创建临时表并写入数据
def create_table(connection, table_name, query_results):
    try:
        columns = [column["name"] for column in query_results["columns"]]
        safe_columns = [fix_column_name(column) for column in columns]

        column_list = ", ".join(safe_columns)
        # 生成建表语句
        create_table = "CREATE TABLE {table_name} ({column_list})".format(
            table_name=table_name, column_list=column_list
        )
        logger.debug("CREATE TABLE query: %s", create_table)
        connection.execute(create_table)
    except sqlite3.OperationalError as exc:
        raise CreateTableError(
            "Error creating table {}: {}".format(table_name, str(exc))
        )

    insert_template = "insert into {table_name} ({column_list}) values ({place_holders})".format(
        table_name=table_name,
        column_list=column_list,
        place_holders=",".join(["?"] * len(columns)),
    )
		# 逐条写入数据
    for row in query_results["rows"]:
        values = [flatten(row.get(column)) for column in columns]
        connection.execute(insert_template, values)


class Results(BaseQueryRunner):
    ...

    def run_query(self, query, user):
        """
				:query: 在 QRDS 上的查询语句
				...

				"""
        # sqlite 内存模式访问
        connection = sqlite3.connect(":memory:")

        query_ids = extract_query_ids(query)
        create_tables_from_query_ids(user, connection, query_ids)

        cursor = connection.cursor()

        try:
            cursor.execute(query)

            if cursor.description is not None:
                columns = self.fetch_columns([(i[0], None) for i in cursor.description])

                rows = []
                column_names = [c["name"] for c in columns]

                for i, row in enumerate(cursor):
                    for j, col in enumerate(row):
                        guess = guess_type(col)

                        if columns[j]["type"] is None:
                            columns[j]["type"] = guess
                        elif columns[j]["type"] != guess:
                            columns[j]["type"] = TYPE_STRING

                    rows.append(dict(zip(column_names, row)))

                data = {"columns": columns, "rows": rows}
                error = None
                json_data = json_dumps(data)
            else:
                error = "Query completed but it returned no data."
                json_data = None
        except (KeyboardInterrupt, JobTimeoutException):
            connection.cancel()
            raise
        finally:
            connection.close()
        return json_data, error

https://github.com/getredash/redash/issues/2527

https://redash.io/help/user-guide/querying/query-results-data-source

https://sourcegraph.com/github.com/getredash/redash@143d22db04a9058966b8c7d678b06f228b937326/-/blob/redash/query_runner/query_results.py?L70:5